[[pause-resume]]
= Pausing and Resuming Listener Containers

Version 2.1.3 added `pause()` and `resume()` methods to listener containers.
Previously, you could pause a consumer within a `ConsumerAwareMessageListener` and resume it by listening for a `ListenerContainerIdleEvent`, which provides access to the `Consumer` object.
While you could pause a consumer in an idle container by using an event listener, in some cases, this was not thread-safe, since there is no guarantee that the event listener is invoked on the consumer thread.
To safely pause and resume consumers, you should use the `pause` and `resume` methods on the listener containers.
A `pause()` takes effect just before the next `poll()`; a `resume()` takes effect just after the current `poll()` returns.
When a container is paused, it continues to `poll()` the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records.
See the Kafka documentation for more information.

Starting with version 2.1.5, you can call `isPauseRequested()` to see if `pause()` has been called.
However, the consumers might not have actually paused yet.
`isConsumerPaused()` returns true if all `Consumer` instances have actually paused.

In addition(also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property.

Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed.
By default, the pause takes effect when all the records from the previous poll have been processed.
See xref:kafka/container-props.adoc#pauseImmediate[pauseImmediate].

The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events:

[source, java]
----
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}
----

The following listing shows the results of the preceding example:

[source]
----
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
----

